Scoobi is centered around the idea of a distributed collection, which is implemented by the DList
(distributed list) class. In a lot of ways, DList
objects are similar to normal Scala List
objects: they are parameterized by a type and they provide methods that can be used to produce new DList
objects, often parameterized by higher-order functions. For example:
// Converting a List[Int] to a List[String] keeping only evens
val stringList = intList filter { _ % 2 == 0 } map { _.toString }
// Converting a DList[Int] to a DList[String] keeping only evens
val stringDList = intDList filter { _ % 2 == 0 } map { _.toString }
However, unlike a Scala List
object, the contents of DList
objects are not stored on the JVM heap but stored elsewhere, typically HDFS. Secondly, calling DList
methods will not immediately result in data being generated in HDFS. This is because, behind the scenes, Scoobi implements a staging compiler. The purpose of DList
methods are to construct a graph of data transformations. Then, the act of persisting a DList
triggers the compilation of the graph into one or more MapReduce jobs and their execution.
In summary, DList
objects essentially provide two abstractions:
DList
object abstracts the storage of data and files in HDFS;DList
objects to transform and manipulate them abstracts the mapper, combiner, reducer and sort-and-shuffle phases of MapReduce.Let's take a step-by-step look at the simple word count example from above. The complete application for word count looks like this:
import com.nicta.scoobi.Scoobi._
object WordCount extends ScoobiApp {
def run() {
val lines: DList[String] = fromTextFile(args(0))
val counts: DList[(String, Int)] = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)
persist(toTextFile(counts, args(1)))
}
}
Our word count example is implemented by the object WordCount
, wich extends a ScoobiApp
. This is a convience in Scoobi to avoid having to create configuration objects, as well as automatically handling arguments intended for hadoop. The remaining arguments are available as args
Within the implementation guts, the first task is to construct a DList
representing the data located at the input directory. In this situation, because the input data are simple text files, we can use the fromTextFile
method that takes our input directory as an argument and returns a DList[String]
object. Here our DList
object is a distributed collection where each collection element is a line from the input data and is assigned to lines
.
The second task is to compute a DList
of word counts given all the lines of text from our input data. This is implemented in four steps:
A flatMap
is performed on lines
. Like List
's flatMap
, a parameterizing function is supplied which will take as its input a given line (a String
) and will return 0 or more String
s as its result. In this case, that function is the method split
which will split the input string (a line) into a collection of words based on the occurrence of whitespace. The result of the flatMap
then is another DList[String]
representing a distributed collection of words.
A map
is performed on the distributed collection of words. Like List
's map
, a parameterizing function is supplied which takes as its input a given word (a String
) and will return another value. In this case the supplied function takes the input word and returns a pair: the word and the value 1. The resulting object is a new distributed collection of type DList[(String, Int)]
.
A groupByKey
is performed on the (String, Int)
distributed collection. groupByKey
has no direct counterpart in List
(although there is a groupBy
defined on DList
s). groupByKey
must be called on a key-value DList
object else the program will not type check. The effect of groupByKey
is to collect all distributed collection values with the same key. In this case the DList
object is of type (String, Int)
so a new DList
object will be returned of type (String, Iterable[Int])
. That is, the counts for the same words will be grouped together.
To get the total count for each word, a combine
is performed. combine
also has no counterpart in List
but its semantics are to take a DList[(K, Iterable[V])]
and return a DList[(K, V)]
by reducing all the values. It is parameterized by a function of type (V, V) => V
that must be associative. In our case we are simply performing addition to sum all the counts.
The final task is to take the counts
object, which represents counts for each word, and persist it. In this case we will simply persist it as a text file, whose path is specified by the second command line argument, using toTextFile
. Note that toTextFile
is used within persist
. Although not demonstrated in this example, persist
takes a variable number of arguments, each of which specifies what DList
is being persisted and how.
Until persist
is called, our application will only be running on the local client. The act of calling persist
, along with the DList
(s) to be persisted, will trigger Scoobi's staging compiler to take the sequence of DList
transformations and turn them into one or more Hadoop MapReduce jobs. In this example Scoobi will generate a single MapReduce job that would be executed:
flatMap
and map
will become part of a mapper tasks;groupByKey
will be occur as a consequence of the sort-and-shuffle phase;combine
will become part of both a combiner and reducer task.The word count example is one of a number of examples included with Scoobi. The top level directory examples contains a number of self-contained tutorial-like examples, as well as a guide to building and deploying them. This is an additional starting point for learning and using scoobi.
We have already seen a number of DList
methods - map
, flatMap
. These methods are parallel operations in that they are performed in parallel by Hadoop across the disributed data set. The DList
trait implements parallel operations for many of the methods that you would find in the standard Scala collections:
map
flatMap
filter
filterNot
collect
partition
flatten
distinct
++
All of these methods are built upon the primitive parallel operation parallelDo
. Unlike the other DList
methods, parallelDo
provides a less functional interface and requires the user to implement a DoFn
object:
def parallelDo[B](dofn: DoFn[A, B]): DList[B]
trait DoFn[A, B] {
def setup(): Unit
def process(input: A, emitter: Emitter[B]): Unit
def cleanup(emitter: Emitter[B]): Unit
}
Because the DoFn
object has an interface that is closely aligned to the Hadoop mapper and reducer task APIs it allows greater flexibility and control beyond what the collections-style APIs provide. For example, a DoFn
object can maintain state where the other APIs can not:
// Fuzzy top 10 - each mapper task will only emit the top 10 integers it processes
val ints: DList[Int] = ...
val top10ints: DList[Int] = ints.parallelDo(new DoFn[Int, Int] {
val top = scala.collection.mutable.Set[Int].empty
def setup() {}
def process(input: Int, emitter: Emitter[Int]) {
if (top.size < 10) {
top += input
}
else if (input > top.min) {
top -= top.min
top += input
}
}
def cleanup(emitter: Emitter[Int]) { top foreach { emitter.emit(_) } }
})
Whilst the parallelDo
and DoFn
APIs provide greater flexibility, it is best practcie to use the collections-style APIs where possible.
We have already seen the groupByKey
method. DList
also has a groupBy
method that allows you to specicfy how the key is determined:
case class Person(name: String, age: Int)
val people: DList[Person] = ...
val peoplebyAge: DList[(Int, Person)] = people.groupBy(_.age)
The grouping methods abstract Hadoop's sort-and-shuffle phase. As such it is possible to have more control over this phase using the Grouping
type class. This allows functionality such as secondary sorting to be implemented. Refer to the Grouping section for a more detailed explanation.
The combine
method is Scoobi's abstraction of Hadoop's combiner functionality. For best results, combine
should be
called immediately after a groupBy
or groupByKey
method, as in the Word Count example.
DList
objects are merely nodes in a graph describing a series of data transformations we want to perform. However, at some point we need to specify what the inputs and outputs to that computation are. We have already seen this in the previous example with fromTextFile(...)
and persist(toTextFile(...))
. The former is an example of loading data and the latter is an example of persisting data.
There are many ways of creating a new DList
by loading data. Data can be loaded from files or various formats (e.g. text, sequence files, Avro files). Similarly, there are many ways in which a DList
can be persisted. The Input and Output section provides a complete listing of all loading and persisting mechanisms.
There are two parts to persisting:
DList
object is to be persisted;persist
, which triggers the evaluation of the DList
objects computation.For example, to persist a DList
to a text file we could write:
val rankings: DList[(String, Int)] = ...
persist(toTextFile(rankings, "hdfs://path/to/output"))
It is important to note that a call to persist
is the mechansim for triggering the computation of a DList
and all its dependencies. Until persist
is called, a DList
is simply a specification for some distributed computation. persist
can of course bundle together multiple DLists
allowing it to be aware of any shared computations:
val rankings: DList[(String, Int)] = ...
val rankings_reverse: DList[(Int, String)] = rankings.map(_.swap)
val rankings_example: DList[(Int, String)] = rankings_reverse.groupByKey.map{ case (ranking, items) => (ranking, items.head) }
persist(toTextFile(rankings, "hdfs://path/to/output"),
toTextFile(rankings_reverse, "hdfs://path/to/output-reverse"),
toTextFile(rankings_example, "hdfs://path/to/output-example"))